#!/usr/bin/env python3
# Copyright 2019-2023 Alibaba Group Holding Limited.
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause

from sh import Command
import sys
from glob import glob
import os
from sh import rpm, grep, lsmod, grubby, rpm2cpio, echo, cpio, awk, modinfo, yum, reboot, uname
import logging
import colorlog

handler = logging.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
    '%(cyan)s%(asctime)s%(reset)s %(log_color)s%(levelname)s %(white)s%(message)s%(reset)s',
    datefmt='%Y-%m-%d %H:%M:%S'))
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(handler)
logging.getLogger().addHandler(logging.FileHandler('/var/log/reboot-test'))

runonce = Command('test_reboot/runonce')

class TestReboot:
    def setup_class(self, step, alter_ver=''):
        self.step = int(step)
        self.alter_ver = alter_ver

    def get_rpm(self):
        scheduler_rpm = glob(os.path.join('/tmp/work', 'scheduler*.rpm'))
        if len(scheduler_rpm) != 1:
            print("Please check your scheduler rpm");
            sys.exit(1)
        return scheduler_rpm

    def get_kernel_from_rpm(self):
        rpm = self.get_rpm()
        echo('*/scheduler.ko', _out='pattern.txt')
        cpio(rpm2cpio(rpm, _piped=True),
             to_stdout=True, extract=True, pattern_file='pattern.txt',
             _out='scheduler.ko')
        return awk(modinfo('scheduler.ko'), '/vermagic/{print $2}').strip()

    def error_handler(self):
        print("Reboot test " + "\033[31mFAILED\033[0m")
        self.teardown_class()
        raise

    def check_scheduler_ver(self, expected):
        curr = uname(kernel_release=True).strip()
        if expected != curr:
            self.error_handler()

    def install_alternative_kernel(self):
        curr = uname(kernel_release=True).strip()
        uname_noarch = curr[:curr.rfind('.')]
        arch = uname(hardware_platform=True).strip()

        installed_kernel = yum.list.installed('kernel', showduplicates=True, color='never', enablerepo='Plus')
        available_kernel = yum.list('kernel', showduplicates=True, color='never', enablerepo='Plus')
        installed_vers = awk(installed_kernel, '/^kernel/{print $2}').splitlines()
        available_vers = awk(available_kernel, '/^kernel/{print $2}').splitlines()

        if len(installed_vers) >= 2:
            installed_vers.remove(uname_noarch)
            return '%s.%s' % (installed_vers[0], arch)
        else:
            available_vers.remove(uname_noarch)
            yum.install('kernel-%s' % available_vers[0], assumeyes=True)
            return '%s.%s' % (available_vers[0], arch)

    def change_kernel(self, ver):
        vmlinuz = '/boot/vmlinuz-%s' % ver
        grubby(set_default=vmlinuz)

    def test_all(self):
        if self.step == 0:
            logging.info("NOTE this test runs in the background, please check /var/log/reboot-test")
            # check kernel, install, check ko, change kernel, reboot
            logging.info("Running Test Reboot #1")
            self.check_scheduler_ver(self.get_kernel_from_rpm())
            rpm(self.get_rpm(), install=True)
            grep(lsmod(), 'scheduler', word_regexp=True)
            alter_ver = self.install_alternative_kernel()
            self.change_kernel(alter_ver)
            runonce('test_reboot/assert %d %s' % (self.step + 1, alter_ver))
        elif self.step == 1:
            # check kernel, remove, install, change back kernel, reboot
            logging.info("Running Test Reboot #2")
            self.check_scheduler_ver(self.alter_ver)
            rpm('scheduler-xxx', erase=True)
            rpm(self.get_rpm(), install=True)
            self.change_kernel(self.get_kernel_from_rpm())
            runonce('test_reboot/assert %d' % (self.step + 1))
        elif self.step == 2:
            # check kernel, check ko, remove
            logging.info("Running Test Reboot #3")
            self.check_scheduler_ver(self.get_kernel_from_rpm())
            grep(lsmod(), 'scheduler', word_regexp=True)
            rpm('scheduler-xxx', erase=True)
            return
        else:
            return

        reboot()

    def teardown_class(self):
        if grep(lsmod(), 'scheduler', word_regexp=True, _ok_code=[0,1]).exit_code == 0:
            rpm('scheduler-xxx', erase=True)

if __name__ == '__main__':
    test_unit = TestReboot()
    if len(sys.argv) == 1:
        args = [0]
    elif len(sys.argv) >= 2:
        args = sys.argv[1:]

    test_unit.setup_class(*args)
    test_unit.test_all()
    test_unit.teardown_class()
